/*
 * Copyright (c) 2011-2018, Meituan Dianping. All Rights Reserved.
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.dianping.zebra.single.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

import com.dianping.zebra.filter.DefaultJdbcFilterChain;
import com.dianping.zebra.filter.JdbcFilter;
import com.dianping.zebra.filter.JdbcOperationCallback;
import com.dianping.zebra.filter.SQLProcessContext;
import com.dianping.zebra.group.config.datasource.entity.DataSourceConfig;
import com.dianping.zebra.util.SqlType;
import com.dianping.zebra.util.SqlUtils;

/**
 * @author hao.zhu
 */
public class SingleStatement implements Statement {

	private List<String> batchedSqls;

	protected final SingleConnection singleConnection;

	protected final List<JdbcFilter> filters;

	protected final Statement innerStatement;

	protected boolean closed;

	public SingleStatement(String dsId, SingleConnection singleConnection, Statement innnerStatement,
			List<JdbcFilter> filters) throws SQLException {
		this.singleConnection = singleConnection;
		this.innerStatement = innnerStatement;
		this.filters = filters;
	}

	@Override
	public ResultSet executeQuery(final String sql) throws SQLException {
		checkClosed();
		final String processedSql = processSQL(sql, false);
		return executeWithFilter(new JdbcOperationCallback<ResultSet>() {
			@Override
			public ResultSet doAction(Connection conn) throws SQLException {
				ResultSet rs = innerStatement.executeQuery(processedSql);
				return new SingleResultSet(rs, filters);
			}
		}, processedSql, null, false);
	}

	@Override
	public int executeUpdate(final String sql) throws SQLException {
		checkClosed();
		final String processedSql = processSQL(sql, false);
		return executeWithFilter(new JdbcOperationCallback<Integer>() {
			@Override
			public Integer doAction(Connection conn) throws SQLException {
				return innerStatement.executeUpdate(processedSql);
			}
		}, processedSql, null, false);
	}

	@Override
	public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
		checkClosed();
		final String processedSql = processSQL(sql, false);
		return executeWithFilter(new JdbcOperationCallback<Integer>() {
			@Override
			public Integer doAction(Connection conn) throws SQLException {
				return innerStatement.executeUpdate(processedSql, autoGeneratedKeys);
			}
		}, processedSql, null, false);
	}

	@Override
	public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
		checkClosed();
		final String processedSql = processSQL(sql, false);
		return executeWithFilter(new JdbcOperationCallback<Integer>() {
			@Override
			public Integer doAction(Connection conn) throws SQLException {
				return innerStatement.executeUpdate(processedSql, columnIndexes);
			}
		}, processedSql, null, false);
	}

	@Override
	public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
		checkClosed();
		final String processedSql = processSQL(sql, false);
		return executeWithFilter(new JdbcOperationCallback<Integer>() {
			@Override
			public Integer doAction(Connection conn) throws SQLException {
				return innerStatement.executeUpdate(processedSql, columnNames);
			}
		}, processedSql, null, false);
	}

	@Override
	public boolean execute(String sql) throws SQLException {
		checkClosed();
		return executeInternal(sql, -1, null, null);
	}

	@Override
	public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
		checkClosed();
		return executeInternal(sql, autoGeneratedKeys, null, null);
	}

	@Override
	public boolean execute(String sql, int[] columnIndexes) throws SQLException {
		checkClosed();
		return executeInternal(sql, -1, columnIndexes, null);
	}

	@Override
	public boolean execute(String sql, String[] columnNames) throws SQLException {
		checkClosed();
		return executeInternal(sql, -1, null, columnNames);
	}

	@Override
	public int[] executeBatch() throws SQLException {
		checkClosed();
		try {
			return executeWithFilter(new JdbcOperationCallback<int[]>() {
				@Override
				public int[] doAction(Connection conn) throws SQLException {
					return innerStatement.executeBatch();
				}
			}, null, null, true);
		} finally {
			if (batchedSqls != null) {
				batchedSqls.clear();
			}
		}
	}

	private boolean executeInternal(String sql, int autoGeneratedKeys, int[] columnIndexes, String[] columnNames)
			throws SQLException {
		SqlType sqlType = SqlUtils.getSqlType(sql);
		if (sqlType.isQuery()) {
			executeQuery(sql);
			return true;
		} else {
			if (autoGeneratedKeys == -1 && columnIndexes == null && columnNames == null) {
				executeUpdate(sql);
			} else if (autoGeneratedKeys != -1) {
				executeUpdate(sql, autoGeneratedKeys);
			} else if (columnIndexes != null) {
				executeUpdate(sql, columnIndexes);
			} else if (columnNames != null) {
				executeUpdate(sql, columnNames);
			} else {
				executeUpdate(sql);
			}

			return false;
		}
	}

	protected String processSQL(final String sql, boolean isPreparedStmt) throws SQLException {
		if (filters != null && filters.size() > 0) {
			JdbcFilter chain = new DefaultJdbcFilterChain(filters) {
				@Override
				public String processSQL(DataSourceConfig dsConfig, SQLProcessContext ctx, JdbcFilter chain)
						throws SQLException {
					if (index < filters.size()) {
						return filters.get(index++).processSQL(dsConfig, ctx, chain);
					} else {
						return sql;
					}
				}
			};

			return chain.processSQL(singleConnection.getDataSource().getConfig(), new SQLProcessContext(isPreparedStmt),
					chain);
		}

		return sql;
	}

	@SuppressWarnings({ "unchecked", "hiding" })
	protected <T> T executeWithFilter(final JdbcOperationCallback<T> callback, final String sql, Object params,
			boolean isBatch) throws SQLException {
		if (filters != null && filters.size() > 0) {
			JdbcFilter chain = new DefaultJdbcFilterChain(filters) {
				@Override
				public <T> T executeSingleStatement(SingleStatement source, SingleConnection conn, String sql,
						List<String> batchedSql, boolean isBatched, boolean autoCommit, Object params, JdbcFilter chain)
						throws SQLException {
					if (index < filters.size()) {
						return filters.get(index++).executeSingleStatement(source, conn, sql, (List<String>) batchedSql,
								isBatched, autoCommit, params, chain);
					} else {
						return (T) executeWithFilterOrigin(callback, conn);
					}
				}
			};

			return chain.executeSingleStatement(this, this.singleConnection, sql, null, isBatch,
					this.singleConnection.getAutoCommit(), params, chain);
		} else {
			return executeWithFilterOrigin(callback, this.singleConnection);
		}
	}

	private <T> T executeWithFilterOrigin(JdbcOperationCallback<T> callback, Connection conn) throws SQLException {
		return callback.doAction(conn);
	}

	@Override
	public void addBatch(String sql) throws SQLException {
		if (batchedSqls == null) {
			batchedSqls = new ArrayList<String>();
		}

		if (sql != null) {
			batchedSqls.add(sql);
		}

		this.innerStatement.addBatch(sql);
	}

	@Override
	public void clearBatch() throws SQLException {
		this.innerStatement.clearBatch();
	}

	@Override
	public ResultSet getGeneratedKeys() throws SQLException {
		return this.innerStatement.getGeneratedKeys();
	}

	@Override
	public Connection getConnection() throws SQLException {
		return singleConnection;
	}

	@Override
	public void cancel() throws SQLException {
		this.innerStatement.cancel();
	}

	@Override
	public void close() throws SQLException {
		if (closed) {
			return;
		}
		if (this.innerStatement != null) {
			this.innerStatement.close();
		}
		closed = true;
	}

	protected void checkClosed() throws SQLException {
		if (this.closed) {
			throw new SQLException("Operation not supported after statement closed.");
		}
	}

	@Override
	public int getMaxFieldSize() throws SQLException {
		return this.innerStatement.getMaxFieldSize();
	}

	@Override
	public void setMaxFieldSize(int max) throws SQLException {
		this.innerStatement.setMaxFieldSize(max);
	}

	@Override
	public int getMaxRows() throws SQLException {
		return this.innerStatement.getMaxRows();
	}

	@Override
	public void setMaxRows(int max) throws SQLException {
		this.innerStatement.setMaxRows(max);
	}

	@Override
	public void setEscapeProcessing(boolean enable) throws SQLException {
		this.innerStatement.setEscapeProcessing(enable);
	}

	@Override
	public int getQueryTimeout() throws SQLException {
		return this.innerStatement.getQueryTimeout();
	}

	@Override
	public void setQueryTimeout(int seconds) throws SQLException {
		this.innerStatement.setQueryTimeout(seconds);
	}

	@Override
	public SQLWarning getWarnings() throws SQLException {
		return this.innerStatement.getWarnings();
	}

	@Override
	public void clearWarnings() throws SQLException {
		this.innerStatement.clearWarnings();
	}

	@Override
	public void setCursorName(String name) throws SQLException {
		this.innerStatement.setCursorName(name);
	}

	@Override
	public ResultSet getResultSet() throws SQLException {
		return this.innerStatement.getResultSet();
	}

	@Override
	public int getUpdateCount() throws SQLException {
		return this.innerStatement.getUpdateCount();
	}

	@Override
	public boolean getMoreResults(int current) throws SQLException {
		return this.innerStatement.getMoreResults(current);
	}

	@Override
	public boolean getMoreResults() throws SQLException {
		return this.innerStatement.getMoreResults();
	}

	@Override
	public int getFetchDirection() throws SQLException {
		return this.innerStatement.getFetchDirection();
	}

	@Override
	public void setFetchDirection(int direction) throws SQLException {
		this.innerStatement.setFetchDirection(direction);
	}

	@Override
	public int getFetchSize() throws SQLException {
		return this.innerStatement.getFetchSize();
	}

	@Override
	public void setFetchSize(int rows) throws SQLException {
		this.innerStatement.setFetchSize(rows);
	}

	@Override
	public int getResultSetConcurrency() throws SQLException {
		return this.innerStatement.getResultSetConcurrency();
	}

	@Override
	public int getResultSetType() throws SQLException {
		return this.innerStatement.getResultSetType();
	}

	@Override
	public int getResultSetHoldability() throws SQLException {
		return this.innerStatement.getResultSetHoldability();
	}

	@Override
	public boolean isClosed() throws SQLException {
		return this.innerStatement.isClosed();
	}

	@Override
	public boolean isPoolable() throws SQLException {
		return this.innerStatement.isPoolable();
	}

	@Override
	public void setPoolable(boolean poolable) throws SQLException {
		this.innerStatement.setPoolable(poolable);
	}

	public void closeOnCompletion() throws SQLException {
		throw new UnsupportedOperationException("zebra does not support closeOnCompletion");
	}

	public boolean isCloseOnCompletion() throws SQLException {
		throw new UnsupportedOperationException("zebra does not support isCloseOnCompletion");
	}

	@Override
	public <T> T unwrap(Class<T> iface) throws SQLException {
		return this.innerStatement.unwrap(iface);
	}

	@Override
	public boolean isWrapperFor(Class<?> iface) throws SQLException {
		return this.innerStatement.isWrapperFor(iface);
	}
}